/**
  ETFAna project, Anyang Normal University && IMP-CAS
  \class ETFReadShm
  \brief read data from daa shm, and write to ETFQueue for further processing
  Supposed to be a tool class
  \author SUN Yazhou, asia.rabbit@163.com
  \since 2023-06-09
  \date 2023-06-09 last modified
  \attention
  changelog
  <table>
  <tr>  <th>Date         <th>Author      <th>Description                    </tr>
  <tr>  <td>2023-06-09   <td>Asia Sun    <td>file created                   </tr>
  </table>

  \copyright Copyright (c) 2021-2024 Anyang Normal U. && IMP-CAS with LGPLv3 LICENSE
*/

#include <iostream>
#include <unistd.h>

#include "config.h"
#ifdef _ROOT_
#include <TSystem.h>
#endif

#include "online_shm.h"
#include "ETFReadShm.h"
#include "ETFQueue.h"
#include "ETFShm.h"
#include "ETFMsg.h"

using std::cout;
using std::endl;

#define er ETFMsg::Error

ETFReadShm::ETFReadShm() : ETFDaqData(), fShm(0), fBuf(0), fMAX_NUM(0), fMAX_LEN(0){}

// to facilitate immediate reaction to ctrl-C
void handler_readShm(int sig, siginfo_t *info, void *context);
void cleanRead(void *v); // release mutex and post nstored (undone what's wrong)

// name: the posix shm name to read from
// isCreate whether to create shm if not exists
ETFReadShm::ETFReadShm(const string &name, DaqType daqType, bool isCreate)
: ETFDaqData(daqType), fShm(0), fBuf(0), fMAX_NUM(0), fMAX_LEN(0){
  fQueue = new ETFQueue(fDaqType);
  CreateShm(name, isCreate);
  // inspect the integrity of the online shm //
  const int tot = ETFReadShm::peeksem(fBuf->nempty) + ETFReadShm::peeksem(fBuf->nstored);
  if(isCreate || tot != fMAX_NUM) InitShm();
  else CleanShm(); // recover last abnormal exit if any
  // create thread to monitor fNLoaded upon capturing SIGINT //
  fsa.sa_sigaction = handler_readShm;
  if(-1 == sigaction(SIGINT, &fsa, nullptr))
    er("ETFReadShm", "ctor: sigaction err - %s", strerror(errno));
} // end ctor

 // name: name of the daq shm, e.g., online_shm_VME
void ETFReadShm::CreateShm(const string &name, bool isCreate){
  fShm = new ETFShm(name, fDaqType, isCreate); // uniquely identified by its name
  fBuf = reinterpret_cast<online_shm *>(fShm->buf());
  if(kPXI == fDaqType){
    fMAX_NUM = online_shm_pxi::MAX_NUM;
    fMAX_LEN = online_shm_pxi::MAX_LEN;
  } else if(kVME == fDaqType){
    fMAX_NUM = online_shm_vme::MAX_NUM;
    fMAX_LEN = online_shm_vme::MAX_LEN;
  } // end if
} // end member function CreateShm

// initialize the shared memory - only for debug purposes, never use in formal applications
void ETFReadShm::InitShm(bool reinit){
  if(reinit) while(-1 == sem_trywait(&fBuf->mutex)){
    if(EAGAIN == errno) continue;
    if(EINTR != errno) er("ETFReadShm", "InitShm: sem_wait - %s", strerror(errno));
  } // end while

  if(kVME == fDaqType) static_cast<online_shm_vme *>(fBuf)->initialize(fShm->GetName());
  else if(kPXI == fDaqType) static_cast<online_shm_pxi *>(fBuf)->initialize(fShm->GetName());
  if(reinit) Reset(); // reset the queue
} // end member function InitShm

ETFReadShm::~ETFReadShm(){
  Close();
} // end dtor

// undone read() has done to the semaphores, so as to avoid a deadlock by sem_wait
void cleanRead(void *v){
  // check mutex: if locked, unlock it, or we cannot do anything //
  online_shm *p = reinterpret_cast<online_shm *>(v);
  // usually program crashes at fQueue->Add() in read()
  if(ETFReadShm::peeksem(p->mutex) <= 0){
    // so we just undone what has been done to the semaphores
    if(-1 == sem_post(&p->mutex)) er("cleanRead", "sem_post(mutex) error: %s", strerror(errno));
    // from the start of read() to this place
    if(-1 == sem_post(&p->nstored)) er("cleanRead", "sem_post(nstored) error: %s", strerror(errno));
  } // end if -- note that this may be susceptible to error
} // end function cleanRead
// pthread only accepts static function, which can be averted by using avatars
void *ETFReadShm::avatar(void *rshm){
  ETFReadShm *sm = static_cast<ETFReadShm *>(rshm);
  pthread_cleanup_push(cleanRead, sm->buf());
  sm->read();
  pthread_cleanup_pop(1);
  return nullptr;
} // end member function avatar
void ETFReadShm::Initialize(){
  CleanShm();
  // listen to cmds from the client
  pthread_create(&fRecv, nullptr, ETFReadShm::avatar, (void *)this);
  fIsRecvThAlive = true;
  ETFMsg::Info("ETFReadShm(%s)",
    "Initialize: reading-from-daq thread is now online", fDaqC);
} // end member function Initialize

void ETFReadShm::CleanShm(){
  cleanRead(buf());
} // end member function CleanShm

// read 1 event from daq shm (fBuffer) into fQueue
void ETFReadShm::read(){ // called by the ever-ticking readin pthread
  if(!fShm) er("ETFReadShm", "read: fShm not assigned.");
  if(!fQueue) er("ETFReadShm", "read: fQueue not assigned.");

  int *sec = nullptr; // data pointer to write at
  // always acquires data from daq as long as there is a chance //
  while(!ETFMsg::irp()){
    if(-1 == sem_wait(&fBuf->nstored)) er("ETFReadShm", "read: sem_wait(nstored) - %s", strerror(errno));
    if(-1 == sem_wait(&fBuf->mutex)) er("ETFReadShm", "read: sem_wait(mutex) - %s", strerror(errno));

    // obtain the data buffer pointer corresponding to the daq type //
    if(kPXI == fDaqType) sec = static_cast<online_shm_pxi *>(fBuf)->section[fBuf->p_read];
    else if(kVME == fDaqType) sec = static_cast<online_shm_vme *>(fBuf)->section[fBuf->p_read];
    else er("ETFReadShm", "read: unknown fDaqType");

    fQueue->Add(sec);
    fBuf->p_read = fMAX_NUM == fBuf->p_read + 1 ? 0 : fBuf->p_read + 1;

    if(-1 == sem_post(&fBuf->mutex)) er("ETFReadShm", "read: sem_post(mutex) - %s", strerror(errno));
    if(-1 == sem_post(&fBuf->nempty)) er("ETFReadShm", "read: sem_post(nempty) - %s", strerror(errno));
    if(ETFMsg::irp()) return;

    // print the status of the shm //
    static time_t lastTime;
    if(ETFMsg::ms() - lastTime >= 1000){ // in ms
      lastTime = ETFMsg::ms();
      cout << "ETFReadShm: ";
      cout << (daqType() == kPXI ? "PXI" : "VME");
      cout << " nstored: \033[32;1m" << nstored() << "\033[0m";
      cout << ", nempty: \033[32;1m" << nempty() << "\033[0m";
      cout << "\t\033[36;1m" << ETFMsg::sec0() << "\033[0m" << endl;
    } // end if
  } // end while
} // end member function read

// write 1 event to daq shm - sim daq to debug online
void ETFReadShm::write(const void *src){
  if(nempty() <= 0) return; // DO NOT block daq //
  if(-1 == sem_wait(&fBuf->nempty)) er("ETFReadShm", "write: sem_wait(nempty) - %s", strerror(errno));
  if(-1 == sem_wait(&fBuf->mutex)) er("ETFReadShm", "write: sem_wait(mutex) - %s", strerror(errno));

  // src[0] stores ev_len in words, both for PXI and VME online
  int size = ((int*)src)[0] * sizeof(int);
  if(kPXI == fDaqType) size += sizeof(int); // true ev_len + 1st word for PXI
  if(size > fMAX_LEN) size = fMAX_LEN; // so as to avoid segmentation vialation

  int *sec = nullptr; // data pointer to write at
  if(kPXI == fDaqType) sec = static_cast<online_shm_pxi *>(fBuf)->section[fBuf->p_write];
  else if(kVME == fDaqType) sec = static_cast<online_shm_vme *>(fBuf)->section[fBuf->p_write];
  else er("ETFReadShm", "write: unknown fDaqType");

  memcpy(sec, src, size);
  fBuf->p_write = fMAX_NUM == fBuf->p_write + 1 ? 0 : fBuf->p_write + 1;

  if(-1 == sem_post(&fBuf->mutex)) er("ETFReadShm", "write: sem_post - %s", strerror(errno));
  if(-1 == sem_post(&fBuf->nstored)) er("ETFReadShm", "write: sem_post - %s", strerror(errno));
} // end member function write

void ETFReadShm::Close(){
  // close the recv thread //
  if(fIsRecvThAlive){
    pthread_cancel(fRecv);
    const int s = pthread_join(fRecv, nullptr); // wait for fRecv to be destructed
    if(0 != s) ETFMsg::Error("ETFReadShm", "Close: pthread_join err - %s", strerror(s));
    fIsRecvThAlive = false;
  } // end if

  if(fShm){ delete fShm; fShm = nullptr; }
  if(fQueue){ delete fQueue; fQueue = nullptr; }
} // end member function Close

// print shm status
void ETFReadShm::ShmStatus(){
  cout << "\033[35;1m__" << fShm->GetName() << " --nstored: " << nstored()
       << " --nempty: " << nempty() << "__\033[0m" << endl;
} // end member function ShmStatus

int ETFReadShm::peeksem(sem_t &s){ // utility function, for gdb
  int v = -9999;
  const int i = sem_getvalue(&s, &v);
  if(-1 == i) er("ETFReadShm", "peeksem: sem_getvalue error: %s", strerror(errno));
  return v;
} // end utility function peeksem

// to tell if a sem exists
bool ETFReadShm::exist(const string &name){
  return ETFShm::exist(name);
} // end member function exist

void handler_readShm(int sig, siginfo_t *info, void *context){
  if(SIGINT == sig){
    ETFMsg::Info("handler_readShm", "CTRL-C PRESSED");
  } // end if
  else ETFMsg::Warn("handler_readShm", "peculiar, not CTRL-C");
  ETFMsg::SetIRP(); // relay the SIGINT
} // end handler
